/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

/**
 * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
 * 
 * @param <T> The type of elements produced by the fetcher.
 */
public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable {

	private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);

	// ------------------------------------------------------------------------

	/** The schema to convert between Kafka's byte messages, and Flink's objects */
	private final KeyedDeserializationSchema<T> deserializer;

	/** The configuration for the Kafka consumer */
	private final Properties kafkaProperties;

	/** The maximum number of milliseconds to wait for a fetch batch */
	private final long pollTimeout;

	/** The next offsets that the main thread should commit */
	private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
	
	/** The callback invoked by Kafka once an offset commit is complete */
	private final OffsetCommitCallback offsetCommitCallback;

	/** Reference to the Kafka consumer, once it is created */
	private volatile KafkaConsumer<byte[], byte[]> consumer;
	
	/** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */
	private volatile ExceptionProxy errorHandler;

	/** Flag to mark the main work loop as alive */
	private volatile boolean running = true;

	/** Flag tracking whether the latest commit request has completed */
	private volatile boolean commitInProgress;

	/** For Debug output **/
	private String taskNameWithSubtasks;

	/** We get this from the outside to publish metrics. **/
	private MetricGroup metricGroup;

	// ------------------------------------------------------------------------

	public Kafka09Fetcher(
			SourceContext<T> sourceContext,
			List<KafkaTopicPartition> assignedPartitions,
			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
			ProcessingTimeService processingTimeProvider,
			long autoWatermarkInterval,
			ClassLoader userCodeClassLoader,
			boolean enableCheckpointing,
			String taskNameWithSubtasks,
			MetricGroup metricGroup,
			KeyedDeserializationSchema<T> deserializer,
			Properties kafkaProperties,
			long pollTimeout,
			boolean useMetrics) throws Exception
	{
		super(
				sourceContext,
				assignedPartitions,
				watermarksPeriodic,
				watermarksPunctuated,
				processingTimeProvider,
				autoWatermarkInterval,
				userCodeClassLoader,
				useMetrics);

		this.deserializer = deserializer;
		this.kafkaProperties = kafkaProperties;
		this.pollTimeout = pollTimeout;
		this.nextOffsetsToCommit = new AtomicReference<>();
		this.offsetCommitCallback = new CommitCallback();
		this.taskNameWithSubtasks = taskNameWithSubtasks;
		this.metricGroup = metricGroup;

		// if checkpointing is enabled, we are not automatically committing to Kafka.
		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
				Boolean.toString(!enableCheckpointing));
	}

	// ------------------------------------------------------------------------
	//  Fetcher work methods
	// ------------------------------------------------------------------------

	@Override
	public void runFetchLoop() throws Exception {
		this.errorHandler = new ExceptionProxy(Thread.currentThread());

		// rather than running the main fetch loop directly here, we spawn a dedicated thread
		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
		Thread runner = new Thread(this, getFetcherName() + " for " + taskNameWithSubtasks);
		runner.setDaemon(true);
		runner.start();

		try {
			runner.join();
		} catch (InterruptedException e) {
			// may be the result of a wake-up after an exception. we ignore this here and only
			// restore the interruption state
			Thread.currentThread().interrupt();
		}

		// make sure we propagate any exception that occurred in the concurrent fetch thread,
		// before leaving this method
		this.errorHandler.checkAndThrowException();
	}

	@Override
	public void cancel() {
		// flag the main thread to exit
		running = false;

		// NOTE:
		//   - We cannot interrupt the runner thread, because the Kafka consumer may
		//     deadlock when the thread is interrupted while in certain methods
		//   - We cannot call close() on the consumer, because it will actually throw
		//     an exception if a concurrent call is in progress

		// make sure the consumer finds out faster that we are shutting down 
		if (consumer != null) {
			consumer.wakeup();
		}
	}

	@Override
	public void run() {
		// This method initializes the KafkaConsumer and guarantees it is torn down properly.
		// This is important, because the consumer has multi-threading issues,
		// including concurrent 'close()' calls.

		final KafkaConsumer<byte[], byte[]> consumer;
		try {
			consumer = new KafkaConsumer<>(kafkaProperties);
		}
		catch (Throwable t) {
			running = false;
			errorHandler.reportError(t);
			return;
		}

		// from here on, the consumer will be closed properly
		try {
			assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions()));


			if (useMetrics) {
				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
				addOffsetStateGauge(kafkaMetricGroup);
				// register Kafka metrics to Flink
				Map<MetricName, ? extends Metric> metrics = consumer.metrics();
				if (metrics == null) {
					// MapR's Kafka implementation returns null here.
					LOG.info("Consumer implementation does not support metrics");
				} else {
					// we have Kafka metrics, register them
					for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
						kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
					}
				}
			}

			// seek the consumer to the initial offsets
			for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
				if (partition.isOffsetDefined()) {
					LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
						"to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);

					consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
				} else {
					// for partitions that do not have offsets restored from a checkpoint/savepoint,
					// we need to define our internal offset state for them using the initial offsets retrieved from Kafka
					// by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint

					long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());

					LOG.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset " +
						"will be set to {}", partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);

					// the fetched offset represents the next record to process, so we need to subtract it by 1
					partition.setOffset(fetchedOffset - 1);
				}
			}

			// from now on, external operations may call the consumer
			this.consumer = consumer;

			// main fetch loop
			while (running) {

				// check if there is something to commit
				final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
				if (toCommit != null && !commitInProgress) {
					// reset the work-to-be committed, so we don't repeatedly commit the same
					// also record that a commit is already in progress
					commitInProgress = true;
					consumer.commitAsync(toCommit, offsetCommitCallback);
				}

				// get the next batch of records
				final ConsumerRecords<byte[], byte[]> records;
				try {
					records = consumer.poll(pollTimeout);
				}
				catch (WakeupException we) {
					continue;
				}

				// get the records for each topic partition
				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
					
					List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle());

					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
						T value = deserializer.deserialize(
								record.key(), record.value(),
								record.topic(), record.partition(), record.offset());

						if (deserializer.isEndOfStream(value)) {
							// end of stream signaled
							running = false;
							break;
						}

						// emit the actual record. this also update offset state atomically
						// and deals with timestamps and watermark generation
						emitRecord(value, partition, record.offset(), record);
					}
				}
			}
			// end main fetch loop
		}
		catch (Throwable t) {
			if (running) {
				running = false;
				errorHandler.reportError(t);
			} else {
				LOG.debug("Stopped ConsumerThread threw exception", t);
			}
		}
		finally {
			try {
				consumer.close();
			}
			catch (Throwable t) {
				LOG.warn("Error while closing Kafka 0.9 consumer", t);
			}
		}
	}

	// Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
	protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
		emitRecord(record, partition, offset, Long.MIN_VALUE);
	}
	/**
	 * Protected method to make the partition assignment pluggable, for different Kafka versions.
	 */
	protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) {
		consumer.assign(topicPartitions);
	}

	protected String getFetcherName() {
		return "Kafka 0.9 Fetcher";
	}

	// ------------------------------------------------------------------------
	//  Kafka 0.9 specific fetcher behavior
	// ------------------------------------------------------------------------

	@Override
	public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
		return new TopicPartition(partition.getTopic(), partition.getPartition());
	}

	@Override
	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions();
		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);

		for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
			Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
			if (lastProcessedOffset != null) {
				// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
				// This does not affect Flink's checkpoints/saved state.
				long offsetToCommit = lastProcessedOffset + 1;

				offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
				partition.setCommittedOffset(offsetToCommit);
			}
		}

		// record the work to be committed by the main consumer thread and make sure the consumer notices that
		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
			LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
					"This does not compromise Flink's checkpoint integrity.");
		}
		if (consumer != null) {
			consumer.wakeup();
		}
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
		ArrayList<TopicPartition> result = new ArrayList<>(partitions.length);
		for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
			result.add(p.getKafkaPartitionHandle());
		}
		return result;
	}

	private class CommitCallback implements OffsetCommitCallback {

		@Override
		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
			commitInProgress = false;

			if (ex != null) {
				LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
			}
		}
	}
}
